package org.example.service;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;

import cn.hutool.core.lang.UUID;
import lombok.extern.slf4j.Slf4j;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.example.service.consts.CommonConsts;
import org.example.util.ConfigLoader;
import org.example.util.ESClientUtils;
import org.example.util.KafkaUtils;
import org.example.util.MessageHandler;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;

/**
 * 弹幕消息Kafka消费者
 * 负责消费从Kafka接收的弹幕消息并进行处理
 */
@Slf4j
public class DanmuMessageConsumer implements MessageHandler {

    private KafkaUtils.GenericKafkaConsumer kafkaConsumer;

    private DanmuCsvWriter danmuCsvWriter;

    private static final String CONSUMER_GROUP_ID;
    private static final String DANMU_TOPIC;
    private static final String DANMU_WRITE_BASE_DIR;
    private static final String CONSUMER_ENABLED;
    private static final String ES_ENABLED;

    static {
        java.util.Properties kafkaProps = ConfigLoader.loadProperties("kafka.properties");
        DANMU_TOPIC = kafkaProps.getProperty("danmu.topic", "bilibili-danmu");
        CONSUMER_GROUP_ID = kafkaProps.getProperty("consumer.group.id", "danmu-consumer-group");
        java.util.Properties appProps = ConfigLoader.loadProperties("app.properties");
        DANMU_WRITE_BASE_DIR =appProps.getProperty("write.danmu.baseDir");
        CONSUMER_ENABLED = kafkaProps.getProperty("comsumer.enabled", "false");
        Properties properties = ConfigLoader.loadProperties("elasticsearch.properties");
        ES_ENABLED = properties.getProperty("es.enabled", "false");
    }

    /**
     * 初始化Kafka消费者
     */
    public DanmuMessageConsumer() {
        // 使用KafkaUtils创建消费者
        this.kafkaConsumer = KafkaUtils.startConsumer(
                Collections.singletonList(DANMU_TOPIC),
                CONSUMER_GROUP_ID,
                this);
        //创建文件写入writer
        this.danmuCsvWriter = new DanmuCsvWriter(DANMU_WRITE_BASE_DIR);
        log.info("弹幕消息消费者初始化完成, topic: {}, group: {}", DANMU_TOPIC, CONSUMER_GROUP_ID);
    }

    /**
     * 启动消费者（实际上消费者在构造函数中已经启动）
     */
    public void start() {
        log.info("弹幕消息消费者已启动");
        this.kafkaConsumer.start();
    }

    /**
     * 停止消费者
     */
    public void stop() {
        if (kafkaConsumer != null) {
            kafkaConsumer.stop();
            log.info("弹幕消息消费者已停止");
        }
    }

    /**
     * 实现MessageHandler接口的handle方法
     */
    @Override
    public void handle(String topic, String key, String value, int partition, long offset) {
        processDanmuMessage(value);
    }

    /**
     * 实现MessageHandler接口的handleError方法
     */
    @Override
    public void handleError(String topic, String key, String value, int partition, long offset, Exception exception) {
        log.error("处理弹幕消息失败, topic: {}, partition: {}, offset: {}, message: {}",
                topic, partition, offset, value, exception);
    }

    /**
     * 处理单条弹幕消息
     * 
     * @param messageJson 弹幕消息JSON字符串
     */
    private void processDanmuMessage(String messageJson) {
        try {
            JSONObject messageInfo = JSON.parseObject(messageJson);
            String cmd = messageInfo.getString("cmd");

            if ("DANMU_MSG".equals(cmd) && Objects.nonNull(messageInfo)) {
                // 解析并处理弹幕信息
                Optional.ofNullable(messageInfo.getJSONArray("info"))
                        .filter(x -> x.size() >= 10)
                        .ifPresent(info -> {
                            String danmuMsg = info.getString(1);
                            String userName = Optional.ofNullable(info.getJSONArray(2))
                                    .filter(userInfo -> userInfo.size() > 1)
                                    .map(userInfo -> userInfo.getString(1))
                                    .orElse("");
                            long timestamp = Optional.ofNullable(info.getJSONObject(9))
                                    .map(times -> times.getLong("ts"))
                                    .orElse(0L);

                            // 处理弹幕消息
                            handleDanmuMessage(userName, danmuMsg, timestamp, messageInfo);
                        });
            }
        } catch (Exception e) {
            log.error("解析弹幕消息JSON失败: {}", messageJson, e);
        }
    }

    /**
     * 处理解析后的弹幕消息
     * 
     * @param userName    用户名
     * @param danmuMsg    弹幕内容
     * @param timestamp   时间戳
     * @param fullMessage 完整消息对象
     */
    private void handleDanmuMessage(String userName, String danmuMsg, long timestamp, JSONObject fullMessage) {
        // 打印弹幕信息（从BiliBiliDanmuClient迁移过来的逻辑）
        String appName = fullMessage.getString(CommonConsts.DanMuMessageFieldKey.APP_KEY);
        String roomId = fullMessage.getString(CommonConsts.DanMuMessageFieldKey.ROOM_KEY);
        log.info("收到弹幕消息,应用:{},直播间id{},用户:{},内容:{},时间戳:{}", appName, roomId, userName, danmuMsg, timestamp);

        // 这里可以添加更多的处理逻辑，比如：
        // 1. 数据库存储
        // 2. 实时统计
        // 3. 敏感词过滤
        // 4. 用户行为分析
        // 5. 推送到其他系统

        // 示例：记录调试信息
        log.debug("弹幕详细信息: {}", fullMessage.toJSONString());
        Map<String, Object> danmuData = new HashMap<>();
        danmuData.put(CommonConsts.DanmuEntityFieldKey.APP_NAME_KEY, appName);
        danmuData.put(CommonConsts.DanmuEntityFieldKey.DANMU_MSG_KEY, danmuMsg);
        danmuData.put(CommonConsts.DanmuEntityFieldKey.DANMU_TIMESTAMP_KEY, timestamp);
        danmuData.put(CommonConsts.DanmuEntityFieldKey.DANMU_CREATE_DATE, new Date());
        danmuData.put(CommonConsts.DanmuEntityFieldKey.ROOM_ID_KEY, roomId);
        danmuData.put(CommonConsts.DanmuEntityFieldKey.USER_NICKNAME_KEY, userName);
        // 将弹幕推送到ES
        pushDanmuDataToES(danmuData);
        //保存弹幕到本地
        danmuCsvWriter.writeDanmu(danmuData);
    }

    private void pushDanmuDataToES(Map<String,Object> danmuData) {
        RestHighLevelClient client = ESClientUtils.getClient();

        // 创建index request
        IndexRequest indexRequest = new IndexRequest("danmu_index_2")
                .id(UUID.randomUUID().toString())
                .source(danmuData);
        
        // 发送请求，写入数据
        log.info("开始写数据到ES");
        try{
        IndexResponse resp = client.index(indexRequest, RequestOptions.DEFAULT);
        log.info("数据写入ES结束,resp={}",resp);
        }catch(IOException e){
            log.error("数据写入ES异常,",e);
        }
    }

    /**
     * 检查消费者是否正在运行
     * 
     * @return true if running
     */
    public boolean isRunning() {
        return kafkaConsumer != null && kafkaConsumer.isRunning();
    }

    /**
     * 主方法，用于测试
     */
    public static void main(String[] args) {
        DanmuMessageConsumer consumer = new DanmuMessageConsumer();

        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("接收到关闭信号，正在停止消费者...");
            consumer.stop();
        }));

        // 启动消费者
        consumer.start();

        // 保持主线程运行
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.info("主线程被中断，停止消费者");
            consumer.stop();
        }
    }
}
